Skip to content

Support sorted series column for DF metrics#6347

Merged
alexanderbianchi merged 2 commits intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/sorted-series
Apr 29, 2026
Merged

Support sorted series column for DF metrics#6347
alexanderbianchi merged 2 commits intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/sorted-series

Conversation

@alexanderbianchi
Copy link
Copy Markdown
Collaborator

@alexanderbianchi alexanderbianchi commented Apr 27, 2026

Summary

This PR wires the metrics DataFusion path to use the materialized sorted_series column as the per-series grouping key for rollup-style queries. The metrics table provider now advertises split-local ordering by sorted_series when the selected split metadata proves the default metrics sort schema, and the metrics runtime registers a physical optimizer rule that replaces the sorted-series hash repartition with a sort-preserving merge into a single final aggregate.

It also keeps scan parallelism bounded to split/file partitions for metrics by disabling round-robin repartition injection and file-scan repartitioning in the metrics runtime registration.

Target Query Pattern

This targets the pre-series aggregation shape used by metrics rollups, where sorted_series is the series handle and replaces the old context/points join on bhandle:

WITH bin_max AS (
   SELECT
       sorted_series AS bhdl,
       date_bin(INTERVAL '30 seconds', to_timestamp_seconds(timestamp_secs)) AS time_bin,
       service,
       MAX(value) AS max_bin_val
   FROM metrics
   WHERE metric_name = 'cpu.usage'
     AND env = 'prod'
   GROUP BY bhdl, time_bin, service
)
SELECT
   service,
   time_bin,
   AVG(max_bin_val) AS avg_val
FROM bin_max
GROUP BY service, time_bin;

Final Plan Shape

The important final physical shape is:

DistributedExec
  CoalescePartitionsExec
    ProjectionExec
      AggregateExec: mode=FinalPartitioned, gby=[service, time_bin], aggr=[avg(max_bin_val)]
        RepartitionExec: partitioning=Hash([service, time_bin], target_partitions)
          AggregateExec: mode=Partial, gby=[service, time_bin], aggr=[avg(max_bin_val)]
            ProjectionExec
              AggregateExec: mode=Final, gby=[sorted_series, time_bin, service], aggr=[max(value)]
                SortPreservingMergeExec: [sorted_series ASC NULLS LAST, ...]
                  [Stage 1] => NetworkCoalesceExec
                    SortExec: [sorted_series ASC NULLS LAST, ...], preserve_partitioning=true
                      AggregateExec: mode=Partial, gby=[sorted_series, time_bin, service], aggr=[max(value)]
                        PartitionIsolatorExec
                          DataSourceExec:
                            file_groups={one group per split}
                            output_ordering=[sorted_series ASC NULLS LAST, timestamp_secs DESC NULLS LAST]

The core change is that the inner per-series finalization no longer requires:

RepartitionExec: Hash([sorted_series, ...])

Instead, workers emit split-local partial aggregates, the coordinator coalesces the ordered worker streams, and SortPreservingMergeExec stitches them before a single final aggregate over sorted_series.

The later repartition by (service, time_bin) remains because sorted_series ordering does not imply partitioning by the outer aggregate key.

Intake / Storage Assumptions

This optimization assumes intake and split metadata agree on the physical sort contract:

  • metrics splits with sort_fields == ProductType::Metrics.default_sort_fields() were written using that sort schema
  • the writer materialized sorted_series from the same sort schema before sorting/reordering the batch
  • within each split, sorted_series is monotonically non-decreasing in the physical Parquet row order
  • timestamp_secs remains descending within a series under the default metrics sort schema

The table provider only advertises the sorted-series ordering when all selected splits have the default metrics sort_fields. Old splits with empty sort metadata or splits using custom sort fields do not get this advertised ordering, so the optimizer will not rely on a false ordering guarantee.

Why Update Planning Rules

DataFusion can execute grouped aggregation in a streaming-friendly way when input is ordered by the group key. The default physical plan still inserted a hash repartition for the final aggregate over sorted_series, which turns the distributed plan into a shuffle even though the split output can be sorted and merge-stitched.

The metrics-specific physical rule rewrites only the sorted-series finalization pattern:

FinalPartitioned Aggregate(grouping includes sorted_series)
  SortExec(preserve_partitioning=true, ordering starts with sorted_series)
    RepartitionExec(Hash(... sorted_series ...))

into:

Final Aggregate(grouping includes sorted_series)
  SortPreservingMergeExec(ordering starts with sorted_series)
    SortExec(preserve_partitioning=true)
      original pre-hash child

That preserves correctness while avoiding the network/hash shuffle for the high-cardinality series key.

Future Considerations

Follow-up optimizations that are intentionally left out of this PR:

  • Remove or narrow the explicit SortExec if the partial aggregate can prove it preserves sufficient sorted-series ordering.
  • Use functional dependencies such as sorted_series -> tag columns to avoid redundant grouping/sorting work on tag columns when safe.
  • Decide when the outer (service, time_bin) aggregate should hash repartition versus coalesce to a final reducer, based on the reduced inner-output cardinality.
  • Explore range partitioning by sorted_series for parallel finalization without destroying sorted order.
  • For hotspot series, consider finer partitioning only when the query group key allows it, e.g. (sorted_series, time_bin).

Copy link
Copy Markdown
Contributor

@g-talbot g-talbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but maybe have Nga look at it too, as she was concerned about this.

@alexanderbianchi alexanderbianchi enabled auto-merge (squash) April 29, 2026 21:35
@alexanderbianchi alexanderbianchi merged commit 697ff0e into quickwit-oss:main Apr 29, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants